package com.nx.arch.transactionmsg;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.nx.arch.addon.lock.NxLock;
import com.nx.arch.addon.lock.NxLockClient;
import com.nx.arch.transactionmsg.model.Msg;
import com.nx.arch.transactionmsg.model.MsgInfo;
import com.nx.arch.transactionmsg.model.State;
import com.nx.arch.transactionmsg.model.TxMsgDataSource;
import com.nx.arch.transactionmsg.utils.Config;
import com.nx.arch.transactionmsg.utils.ConfigUtil;
import com.nx.arch.transactionmsg.utils.ConfigUtil.DbInfo;
import com.nx.arch.transactionmsg.utils.ConfigUtil.ServerType;
import com.nx.arch.transactionmsg.utils.NxThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.io.UnsupportedEncodingException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @类名称 MsgProcessor.java
 * @类描述 事务消息处理类：控制事务消息的投递
 * <pre>
 * 主要思想,一个按照创建时间排序的msg Queue，一个按照下次超时时间排序的时间轮队列.
 * 由定时线程扫描时间轮，重新放入msg Queue中，超过6次，直接丢弃，由离线的定时扫描做重试
 * 只有从DB里面查到了，才说明事务提交了
 * </pre>
 * @作者  庄梦蝶殇 linhuaichuan@naixuejiaoyu.com
 * @创建时间 2020年4月14日 下午3:18:40
 * @版本 1.0.0
 *
 * @修改记录
 * <pre>
 *     版本                       修改人 		修改日期 		 修改内容描述
 *     ----------------------------------------------
 *     1.0.0 		庄梦蝶殇 	2020年4月14日             
 *     ----------------------------------------------
 * </pre>
 */
public class MsgProcessor {
    /**
     * ：100ms
     */
    private static final int DEF_TIMEOUT_MS = 100;
    
    /**
     * 处理间隔数组(6次): 0s, 5s, 10s, 25s, 50s, 100s
     */
    private static final int[] TIMEOUT_DATA = new int[] {0, 5, 10, 25, 50, 100};
    
    /**
     * 最大处理次数
     */
    private static final int MAX_DEAL_TIME = 6;
    
    /**
     * 等待消息分页
     */
    private static final int LIMIT_NUM = 50;
    
    private static final int MAX_DEAL_NUM_ONE_TIME = 2000;
    
    /**
     * 时间论转动频率
     */
    private static final int TIME_WHEEL_PERIOD = 5;
    
    /**
     * etcd集群信息
     */
    private static String[] defaultEtcd = {"http://etcd1.lock.nxinc.com:2379", "http://etcd2.lock.nxinc.com:2379", "http://etcd3.lock.nxinc.com:2379"};
    
    private static boolean envNeedLock = false;
    
    private static final int HOLD_LOCK_TIME = 60;
    
    /**
     * 沙箱次数
     */
    private static final int SANBOX_TIMES = 3;
    
    /**
     * 锁 初始延迟时间
     */
    private static final int CHECK_LOCK_INIT_DELAY = 10;
    
    private static final Logger LOGGER = LoggerFactory.getLogger(MsgProcessor.class);
    
    /**
     * 事务操作消息队列
     */
    private PriorityBlockingQueue<Msg> msgQueue;
    
    /**
     * 事务消息执行线程池
     */
    private ExecutorService exeService;
    
    /**
     * 时间轮检查队列：时间轮投递(事务操作)
     */
    private PriorityBlockingQueue<Msg> timeWheel;
    
    /**
     * 定时线程池：其他线程
     */
    private ScheduledExecutorService scheService;
    
    /**
     * 事务消息服务状态
     */
    private AtomicReference<State> state;
    
    /**
     * 消息生产者
     */
    private DefaultMQProducer producer;
    
    /**
     * 事务消息das
     */
    private MsgStorage msgStorage;
    
    /**
     * 配置信息
     */
    private Config config;
    
    /**
     * 锁客户端
     */
    private NxLockClient nxLockClient;
    
    /**
     * 持锁标识:防止多个服务同时操作
     */
    private volatile boolean holdLock = true;
    
    /**
     * 默认资源锁key
     */
    private String lockKey = "defaultTransKey";
    
    /**
     * 环境类型
     */
    private ServerType serverType = null;
    
    public MsgProcessor(DefaultMQProducer producer, MsgStorage msgStorage) {
        this.producer = producer;
        this.msgStorage = msgStorage;
        msgQueue = new PriorityBlockingQueue<Msg>(5000, new Comparator<Msg>() {
            @Override
            public int compare(Msg o1, Msg o2) {
                long diff = o1.getCreateTime() - o2.getCreateTime();
                if (diff > 0) {
                    return 1;
                } else if (diff < 0) {
                    return -1;
                }
                return 0;
            }
        });
        timeWheel = new PriorityBlockingQueue<Msg>(1000, new Comparator<Msg>() {
            @Override
            public int compare(Msg o1, Msg o2) {
                long diff = o1.getNextExpireTime() - o2.getNextExpireTime();
                if (diff > 0) {
                    return 1;
                } else if (diff < 0) {
                    return -1;
                }
                return 0;
            }
        });
        state = new AtomicReference<State>(State.CREATE);
    }
    
    /**
     * @方法名称 init
     * @功能描述 init 初始化，config才是ok
     * @param config 配置对象
     */
    public void init(Config config) {
        if (state.get().equals(State.RUNNING)) {
            LOGGER.info("Msg Processor have inited return");
            return;
        }
        LOGGER.info("MsgProcessor init start");
        state.compareAndSet(State.CREATE, State.RUNNING);
        // 1、设置环境
        this.serverType = ConfigUtil.getServerType();
        if (config.getEtcdHosts() != null && config.getEtcdHosts().length >= 1) {
            envNeedLock = true;
            defaultEtcd = config.getEtcdHosts();
            LOGGER.info("serverType {} envNeedLock {} etcdhosts {}", serverType, envNeedLock, defaultEtcd);
        }
        // 2、设置配置
        this.config = config;
        // 3、沙箱特殊处理
        if (ServerType.Sandbox.equals(serverType)) {
            this.config.setDeleteTimePeriod(this.config.getDeleteTimePeriod() * SANBOX_TIMES);
            this.config.setSchedScanTimePeriod(this.config.getSchedScanTimePeriod() * SANBOX_TIMES);
        }
        // 4、设置 事务消息处理线程数
        exeService = Executors.newFixedThreadPool(config.getThreadNum(), new NxThreadFactory("MsgProcessorThread-"));
        for (int i = 0; i < config.getThreadNum(); i++) {
            exeService.submit(new MsgDeliverTask());
        }
        // 5、设置 其他线程
        scheService = Executors.newScheduledThreadPool(config.getSchedThreadNum(), new NxThreadFactory("MsgScheduledThread-"));
        // 设置时间转动线程:时间轮重试投递失败的事务操作
        scheService.scheduleAtFixedRate(new TimeWheelTask(), TIME_WHEEL_PERIOD, TIME_WHEEL_PERIOD, TimeUnit.MILLISECONDS);
        // 设置事务消息删除线程
        scheService.scheduleAtFixedRate(new CleanMsgTask(), config.deleteTimePeriod, config.deleteTimePeriod, TimeUnit.SECONDS);
        // 设置 补漏线程:防止最近10分钟的线程被漏提交
        scheService.scheduleAtFixedRate(new ScanMsgTask(), config.schedScanTimePeriod, config.schedScanTimePeriod, TimeUnit.SECONDS);
        // 设置心跳线程：汇报 事务提交队列的堆积情况
        scheService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("stats info msgQueue size {} timeWheelQueue size {}", msgQueue.size(), timeWheel.size());
            }
        }, 20, config.getStatsTimePeriod(), TimeUnit.SECONDS);
        // 6、初始化锁客户端
        initLock();
        LOGGER.info("MsgProcessor init end");
    }
    
    /**
     * @方法名称 initLock
     * @功能描述 初始化锁客户端
     */
    private void initLock() {
        // 测试环境没有配置etcd 或者是沙箱环境 默认不用修改holdLock
        if ((ServerType.TestServer.equals(serverType) && !envNeedLock) || ServerType.Sandbox.equals(serverType)) {
            return;
        }
        List<TxMsgDataSource> list = this.msgStorage.getDbDataSources();
        String clusterName = this.config.getServiceName();
        if (list != null && list.size() > 0) {
            String url = list.get(0).getUrl();
            DbInfo db = ConfigUtil.getDataBase(url);
            clusterName = clusterName + "with" + db.getDataBase();
            String key = db.buildKey();
            if (key != null && key.length() > 0) {
                lockKey = key;
            }
        }
        LOGGER.info("init lock clustername {} lockKey {} etcdString {}", clusterName, lockKey, Arrays.toString(defaultEtcd));
        try {
            nxLockClient = NxLock.factory().etcdSolution().connectionString(defaultEtcd).clusterName(clusterName).build();
            Thread thread = new Thread(new KeepLockTask(), "KeepLockThread");
            thread.setDaemon(true);
            thread.start();
            LOGGER.info("init lock success");
        } catch (Exception e) {
            LOGGER.info("init lock error {}", e.getMessage());
        }
    }
    
    /**
     * @方法名称 close
     * @功能描述 关闭事务消息服务
     */
    public void close() {
        LOGGER.info("start close msgProcessor ");
        state.compareAndSet(State.RUNNING, State.CLOSED);
        try {
            exeService.awaitTermination(config.getCloseWaitTime(), TimeUnit.MILLISECONDS);
            scheService.awaitTermination(config.getCloseWaitTime(), TimeUnit.MILLISECONDS);
            this.nxLockClient.close();
        } catch (Exception e) {
            LOGGER.info("close MsgProcessor error {}", e.getMessage());
        }
        if (!exeService.isShutdown()) {
            exeService.shutdownNow();
        }
        if (!scheService.isShutdown()) {
            scheService.shutdownNow();
        }
        LOGGER.info("close msgProcessor success");
    }
    
    /**
     * @方法名称 putMsg
     * @功能描述 
     * @param msg
     */
    protected void putMsg(Msg msg) {
        msgQueue.put(msg);
    }
    
    /**
     * @方法名称 buildMsg
     * @功能描述 生成MQ投递消息
     * @param msgInfo 事务消息对象
     * @return MQ消息
     */
    private static Message buildMsg(final MsgInfo msgInfo)
        throws UnsupportedEncodingException {
        // 组装消息
        String topic = msgInfo.getTopic();
        String tag = msgInfo.getTag();
        String content = msgInfo.getContent();
        String id = msgInfo.getId() + "";
        Message msg = new Message(topic, tag, id, content.getBytes("UTF-8"));
        String header = String.format("{\"topic\":\"%s\",\"tag\":\"%s\",\"id\":\"%s\",\"createTime\":\"%s\"}", topic, tag, id, System.currentTimeMillis());
        msg.putUserProperty("MQHeader", header);
        // 设置延迟消息
        // long createMs = msgInfo.getCreate_time().getTime();
        // int diff = (int)((System.currentTimeMillis() - createMs) / 1000);
        // int latestDelay = msgInfo.getDelay() - diff;
        // if (latestDelay > 0) {
        // msg.setDelayTime(latestDelay, TimeUnit.SECONDS);
        // }
        return msg;
    }
    
    /**
     * @方法名称 releaseLock
     * @功能描述 释放锁
     * @param lock 锁对象
     */
    private static void releaseLock(NxLock lock) {
        try {
            if (lock != null) {
                lock.close();
            }
        } catch (Exception e) {
            LOGGER.error("close lock fail {} msg {}", lock, e.getMessage());
        }
    }
    
    /**
     * @类名称 MsgDeliverTask.java
     * @类描述 事务投递任务：将已提交的事务消息 发布到MQ
     * @版本 1.0.0
     *
     * @修改记录
     * <pre>
     *     版本                       修改人 		修改日期 		 修改内容描述
     *     ----------------------------------------------
     *     1.0.0 		庄梦蝶殇 	2020年4月14日             
     *     ----------------------------------------------
     * </pre>
     */
    class MsgDeliverTask implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (!state.get().equals(State.RUNNING)) {
                    break;
                }
                try {
                    // 1、每100ms从 队列 弹出一条事务操作消息
                    Msg msg = null;
                    try {
                        msg = msgQueue.poll(DEF_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException ex) {
                    }
                    if (msg == null) {
                        continue;
                    }
                    LOGGER.debug("poll msg {}", msg);
                    int dealedTime = msg.getHaveDealedTimes() + 1;
                    msg.setHaveDealedTimes(dealedTime);
                    // 2、从db获取实际事务消息(这里我们不知道是否事务已经提交，所以需要从DB里面拿)
                    MsgInfo msgInfo = msgStorage.getMsgById(msg);
                    LOGGER.debug("getMsgInfo from DB {}", msgInfo);
                    if (msgInfo == null) {
                        if (dealedTime < MAX_DEAL_TIME) {
                            // 3.1、加入时间轮转动队列:重试投递
                            long nextExpireTime = System.currentTimeMillis() + TIMEOUT_DATA[dealedTime];
                            msg.setNextExpireTime(nextExpireTime);
                            timeWheel.put(msg);
                            LOGGER.debug("put msg in timeWhellQueue {} ", msg);
                        }
                    } else {
                        // 3.2、投递事务消息
                        Message mqMsg = buildMsg(msgInfo);
                        LOGGER.debug("will sendMsg {}", mqMsg);
                        SendResult result = producer.send(mqMsg);
                        LOGGER.info("msgId {} topic {} tag {} sendMsg result {}", msgInfo.getId(), mqMsg.getTopic(), mqMsg.getTags(), result);
                        if (null == result || result.getSendStatus() != SendStatus.SEND_OK) {
                            // 投递失败，重入时间轮
                            if (dealedTime < MAX_DEAL_TIME) {
                                long nextExpireTime = System.currentTimeMillis() + TIMEOUT_DATA[dealedTime];
                                msg.setNextExpireTime(nextExpireTime);
                                timeWheel.put(msg);
                                // 这里可以优化 ，因为已经确认事务提交了，可以从DB中拿到了
                                LOGGER.debug("put msg in timeWhellQueue {} ", msg);
                            }
                        } else if (result.getSendStatus() == SendStatus.SEND_OK) {
                            // 投递成功，修改数据库的状态(标识已提交)
                            int res = msgStorage.updateSendMsg(msg);
                            LOGGER.debug("msgId {} updateMsgStatus success res {}", msgInfo.getId(), res);
                        }
                    }
                } catch (Throwable t) {
                    LOGGER.error("MsgProcessor deal msg fail", t);
                }
            }
        }
    }
    
    /**
     * @类描述 时间轮转动线程：重试投递失败的事务操作(未超时)
     * @版本 1.0.0
     *
     * @修改记录
     * <pre>
     *     版本                       修改人 		修改日期 		 修改内容描述
     *     ----------------------------------------------
     *     1.0.0 		庄梦蝶殇 	2020年4月14日             
     *     ----------------------------------------------
     * </pre>
     */
    class TimeWheelTask implements Runnable {
        @Override
        public void run() {
            try {
                if (state.get().equals(State.RUNNING)) {
                    long cruTime = System.currentTimeMillis();
                    Msg msg = timeWheel.peek();
                    // 拿出来的时候有可能还没有超时
                    while (msg != null && msg.getNextExpireTime() <= cruTime) {
                        msg = timeWheel.poll();
                        LOGGER.debug("timeWheel poll msg ,return to msgQueue {}", msg);
                        // 重新放进去
                        msgQueue.put(msg);
                        msg = timeWheel.peek();
                    }
                }
            } catch (Exception ex) {
                LOGGER.error("pool timequeue error", ex);
            }
        }
    }
    
    /**
     * @类描述 事务消息删除线程：删除三天之前的发送成功的消息
     * @版本 1.0.0
     *
     * @修改记录
     * <pre>
     *     版本                       修改人 		修改日期 		 修改内容描述
     *     ----------------------------------------------
     *     1.0.0 		庄梦蝶殇 	2020年4月14日             
     *     ----------------------------------------------
     * </pre>
     */
    class CleanMsgTask implements Runnable {
        @Override
        public void run() {
            if (state.get().equals(State.RUNNING)) {
                LOGGER.debug("DeleteMsg start run");
                try {
                    Iterator<DataSource> it = msgStorage.getDataSourcesMap().values().iterator();
                    while (it.hasNext()) {
                        DataSource dataSrc = it.next();
                        if (holdLock) {
                            LOGGER.info("DeleteMsgRunnable run ");
                            int count = 0;
                            int num = config.deleteMsgOneTimeNum;
                            // 影响行数 不等于 删除数 及 大于最大删除数时，本次task结束
                            while (num == config.deleteMsgOneTimeNum && count < MAX_DEAL_NUM_ONE_TIME) {
                                try {
                                    num = msgStorage.deleteSendedMsg(dataSrc, config.deleteMsgOneTimeNum);
                                    count += num;
                                } catch (SQLException e) {
                                    LOGGER.error("deleteSendedMsg fail ", e);
                                }
                            }
                        }
                    }
                } catch (Exception ex) {
                    LOGGER.error("delete Run error ", ex);
                }
            }
        }
    }
    
    /**
     * @类描述 补漏线程:扫描最近10分钟未提交事务消息,防止各种场景的消息丢失
     * @版本 1.0.0
     *
     * @修改记录
     * <pre>
     *     版本                       修改人 		修改日期 		 修改内容描述
     *     ----------------------------------------------
     *     1.0.0 		庄梦蝶殇 	2020年4月14日             
     *     ----------------------------------------------
     * </pre>
     */
    class ScanMsgTask implements Runnable {
        @Override
        public void run() {
            if (state.get().equals(State.RUNNING)) {
                LOGGER.debug("SchedScanMsg start run");
                Iterator<DataSource> it = msgStorage.getDataSourcesMap().values().iterator();
                while (it.hasNext()) {
                    DataSource dataSrc = it.next();
                    boolean canExe = holdLock;
                    if (canExe) {
                        LOGGER.info("SchedScanMsgRunnable run");
                        int num = LIMIT_NUM;
                        int count = 0;
                        while (num == LIMIT_NUM && count < MAX_DEAL_NUM_ONE_TIME) {
                            try {
                                List<MsgInfo> list = msgStorage.getWaitingMsg(dataSrc, LIMIT_NUM);
                                num = list.size();
                                if (num > 0) {
                                    LOGGER.debug("scan db get msg size {} ", num);
                                }
                                count += num;
                                for (MsgInfo msgInfo : list) {
                                    try {
                                        Message mqMsg = buildMsg(msgInfo);
                                        SendResult result = producer.send(mqMsg);
                                        LOGGER.info("msgId {} topic {} tag {} sendMsg result {}", msgInfo.getId(), mqMsg.getTopic(), mqMsg.getTags(), result);
                                        if (result != null && result.getSendStatus() == SendStatus.SEND_OK) {
                                            // 修改数据库的状态
                                            int res = msgStorage.updateMsgStatus(dataSrc, msgInfo.getId());
                                            LOGGER.debug("msgId {} updateMsgStatus success res {}", msgInfo.getId(), res);
                                        }
                                    } catch (Exception e) {
                                        LOGGER.error("SchedScanMsg deal fail", e);
                                    }
                                }
                            } catch (SQLException e) {
                                LOGGER.error("getWaitMsg fail", e);
                            }
                        }
                    }
                }
            }
        }
        
    }
    
    /**
     * @类描述 分布式抢锁线程
     * @版本 1.0.0
     *
     * @修改记录
     * <pre>
     *     版本                       修改人 		修改日期 		 修改内容描述
     *     ----------------------------------------------
     *     1.0.0 		庄梦蝶殇 	2020年4月14日             
     *     ----------------------------------------------
     * </pre>
     */
    class KeepLockTask implements Runnable {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(CHECK_LOCK_INIT_DELAY);
                LOGGER.info("keep lock run start");
                while (state.get().equals(State.RUNNING)) {
                    NxLock lock = null;
                    try {
                        LOGGER.info("keeplock start run");
                        lock = nxLockClient.newLock(lockKey);
                        if (lock.acquire(HOLD_LOCK_TIME)) {
                            holdLock = true;
                            LOGGER.info("change holdLock to true");
                        } else {
                            holdLock = false;
                        }
                        TimeUnit.SECONDS.sleep(HOLD_LOCK_TIME);
                    } catch (Exception ex) {
                        LOGGER.error("keep lock error {} ", ex.getMessage());
                    } finally {
                        releaseLock(lock);
                    }
                }
            } catch (Exception ex) {
                LOGGER.error("keep lock run error", ex);
            }
            LOGGER.info("keep lock run end");
        }
    }
}
